Slackbot
11/18/2023, 9:58 PMmiek
11/18/2023, 9:59 PMSELECT
<DIMS>
, SUM(source_value) AS source_sum
FROM (
SELECT
source_name
, source_value
, <DIMS>
)
FROM <some_table>
WHERE
source_name in (<source_name_list>)
AND <TIME_WINDOW_FILTER>
)
GROUP BY source_name, <DIMS>
ORDER BY <DIMS>
miek
11/18/2023, 10:01 PM@resolve
to potentially design his but my biggest issue is that I don't know upfront all the possible values for a given dimension.
Let's say the <DIMS>
is stock_symbol, it could be 100's, does it make sense to create 100's of Hamilton nodes dynamically (seems a bit off to me).miek
11/18/2023, 10:03 PMdf.groupby(<DIMS>).agg(['sum'])
is that a recommended way?miek
11/18/2023, 10:05 PMmiek
11/18/2023, 10:07 PMStefan Krawczyk
11/18/2023, 10:08 PMmiek
11/18/2023, 10:17 PM<DIMS> = date, stock_ticker
and source_name = 'volume'
(column in <some_table> that represents trading volume)
my the inner SQL table becomes this (simplifying the in () clause)
SELECT
source_name
, source_value
, date
, symbol
FROM <some_table>
WHERE
source_name = 'volume'
AND date BETWEEN start_date AND end_date
what I know
source_names, in this case 'volume'
what I do NOT know
how many symbols
there will be on a given datemiek
11/18/2023, 10:19 PMmiek
11/18/2023, 10:22 PMStefan Krawczyk
11/18/2023, 10:24 PMmiek
11/18/2023, 10:24 PMStefan Krawczyk
11/18/2023, 10:25 PMmiek
11/18/2023, 10:25 PMmiek
11/18/2023, 10:26 PMmiek
11/18/2023, 10:27 PMsource_name
and even aggregation
types (SUM or whatever) at run-time as wellmiek
11/18/2023, 10:28 PMStefan Krawczyk
11/18/2023, 10:41 PMmiek
11/18/2023, 10:44 PM@resolve
works at this point.
parameterized_subdag and parallelizeable/collect are a good tip, this would make this then easily parallelizable on ray.io later onElijah Ben Izzy
11/18/2023, 10:44 PMStefan Krawczyk
11/18/2023, 10:45 PMmiek
11/18/2023, 10:45 PMmiek
11/18/2023, 10:46 PMmiek
11/18/2023, 10:59 PMuser_id
by region/date (but assuming you don't know what the # of regions are at runtime)Elijah Ben Izzy
11/19/2023, 12:11 AMresolve
— do a quick query of the dimensions to groupby beforehand, and pass those in in config. So, one node per aggregation/groupby. small pieces, expressive DAG, static
(2) Dynamic version of (1) — if you really can’t determine this prior to runtime, then use Parallelizable
+ Collect
for each subset that you don’t know. small pieces, expressive DAG, dynamic
(3) A little bit in the middle. The trick here is pretty simple — anything that can be known beforehand should be represented in the DAG (aggregation types, windows, etc…). Anything known after the fact will be represented as columns inside your dataframe. So, the result will be a bunch of dataframes (a subset of which you can choose to query), with, say, regions as columns. Then you have the choice as to how static you want this to be. Do you do a new function for each aggreagation type with some parameterization? Or pass it all in? Some options here. But, there is a node (or a few, perhaps) for each aggregation/operation, and you’ll probably use @resolve
+ @parameterize
or @parameterize_subdag
to determine that. medium pieces, somewhat expressive DAG, static
(4) Big pieces — E.G. dataframes, then apply a series of operations to them, passing in the set of aggregations. Doesn’t naturally solve your problems, but is an easy way to model it if you don’t want to rely exclusively on Hamilton constructs. big pieces, indexpressive DAG, static
Presuming I have a good enough view of what you’re trying to do, I’d highly recommend (3). It will:
1. Form a nice readable DAG (caveat — this depends on how you leverage the hamilton constructs — note on that in a bit)
2. Likely be more performant (a little subtle, but pandas/other libraries are optimized for groupby/apply operations)
3. Allow for some custom data quality checks to ensure something matches your expectations
4. Allow for the computations done to be expressed in the DAG, nicely self-chunked and parallelizable (see point (2))
5. Allow for further grouping (say, perhaps, if you want to aggregate by region + industry or whatever down the road).
Re: point (1), I would highly recommend first doing this with the vanilla python decorators, then building a domain-specific expressive decorator that is just a thin wrapper on top of them. This way you can use the power of the Hamilton decorators but tailor them towards your use-case. A little nuanced on ways to design this right, but I’ll happily point you in the right direction once you’re at this point.Elijah Ben Izzy
11/19/2023, 12:11 AMmiek
11/19/2023, 1:35 AMParallelizable
+ Collect
route right now, and I believe it could work well. tough your (3) is also cool.
let me digest all of the above a bit and then I circle back with what I come up with soon for additional feedback. thanks again!Elijah Ben Izzy
11/19/2023, 2:11 AMmiek
11/19/2023, 2:26 AM# my_hamilton_nodes.py
def url() -> Parallelizable[str]:
for url_ in ['url_a', 'url_b']:
yield url_
def url_loaded(url: str) -> str:
return url
def counts(url_loaded: str) -> int:
print(type(url_loaded)) # url_loaded seems to be a list not a string???
return len(url_loaded.split("_"))
def total_words(counts: Collect[int]) -> int:
return sum(counts)
is there a particular flag I need to set in the driver such that the input in counts()
is actually a string. it seems to be coming in as a list
I'm using this snippet
import my_hamilton_nodes as mh
import pandas as pd
import my_hamilton_nodes
from hamilton import driver, base, telemetry
from hamilton.execution import executors
telemetry.disable_telemetry()
config = {}
dr = (
driver.Builder()
.with_modules(my_hamilton_nodes)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_local_executor(executors.SynchronousLocalTaskExecutor())
.with_config(config)
.build()
)
output_columns = [
'counts'
]
out = dr.execute(output_columns)
print(out)
but it fails with
File "my_path/hamilton-playground/my_hamilton_nodes.py", line 64, in counts
return len(url_loaded.split("_"))
AttributeError: 'list' object has no attribute 'split'
Elijah Ben Izzy
11/19/2023, 2:30 AMElijah Ben Izzy
11/19/2023, 2:30 AMElijah Ben Izzy
11/19/2023, 2:39 AMParallel
block. count
gets run n times (one for each url). Asking for it as an output is technically “undefined behavior” (we should error out that is), and I think becuase there’s no Collect
its not treting it as such. If you run it with total_words
as the output it’ll work.miek
11/19/2023, 2:41 AM<class 'str'>
<class 'str'>
{'total_words': 4}
and everything works!Elijah Ben Izzy
11/19/2023, 2:42 AMmiek
11/19/2023, 2:42 AMElijah Ben Izzy
11/19/2023, 2:43 AMmiek
11/19/2023, 6:51 AM@resolve
that's my next goal...Stefan Krawczyk
11/19/2023, 7:28 AMmiek
11/19/2023, 7:44 AMdef ticker_df(stock_data: pd.DataFrame) -> Parallelizable[pd.DataFrame]
then <many nodes here all using ticker_df as input and do various aggregations
1 func to merge everything back together, likely a @resolve version of
def final_stats(ticker_aggs: Collect[pd.DataFrame]) -> pd.DataFrame:
df = pd.concat(ticker_aggs, ignore_index=True)
return df
Stefan Krawczyk
11/19/2023, 7:48 AMStefan Krawczyk
11/19/2023, 7:51 AMElijah Ben Izzy
11/19/2023, 7:17 PMmiek
11/19/2023, 8:32 PMParallelizable
multiple times as the 1st time it gets processed by Collect
it disappears from some cache. Getting this error
File "[...]/anaconda3/envs/py310/lib/python3.10/site-packages/hamilton/execution/state.py", line 113, in read
raise KeyError(f"Key {formatted_key} not found in cache")
KeyError: 'Key total_words2 not found in cache'
To reproduce, run this toy example: https://github.com/kzk2000/hamilton-playground/blob/main/toy_run.py
it has two outputs 'total_words and 'total_words2' which are both doing the same trying to Collect
from the same upstream
is this not allowed?
# toy_nodes.py
from hamilton.htypes import Parallelizable, Collect
def urls() -> Parallelizable[str]:
for url_ in ['url_a', 'url_b']:
yield url_
def counts(urls: str) -> int:
return len(urls.split("_"))
def total_words(counts: Collect[int]) -> int:
return sum(counts)
def total_words2(counts: Collect[int]) -> int:
return sum(counts)
miek
11/19/2023, 8:33 PMmiek
11/19/2023, 8:36 PMdef ticker_df(stock_data: pd.DataFrame) -> Parallelizable[pd.DataFrame]:
for ticker, df in stock_data.groupby('ticker'):
yield df
Elijah Ben Izzy
11/19/2023, 8:37 PMclass AggregationInput:
data: pd.DataFrame
agg_param_1: ...
agg_param_2: ...
Which allows you to reuse subdags cleanly. Then you have a set of nodes (or a single node if its simple) for each parallelizable that listens to it, and join them all after the factElijah Ben Izzy
11/19/2023, 8:38 PMmiek
11/19/2023, 8:45 PMclass AggregationInput:
data: pd.DataFrame
agg_param_1: ...
agg_param_2: ...
but then for each subdag, I only set exactly 1 of the agg_params, say agg_params_1 is set when I wanna do aggregation 1, and agg_params_2 is set when I do aggregation2
Is that what you meant?miek
11/19/2023, 8:46 PMElijah Ben Izzy
11/19/2023, 8:57 PM@dataclasses.dataclass
class AggregationArguments:
df: pd.DataFrame
group_id: int # Or str?
group_name: str
aggregation_arguments: Dict[str, Any] # unstructured
def ticker_df_for_rolling_close_bid_computation(stock_data: pd.DataFrame) -> Parallelizable[AggregationArguments]:
for ticker, df in stock_data.groupby(ticker):
yield AggregationArguments(
df=df,
group_id=ticker,
group_name="ticker",
aggregation_arguments={'type' : 'avg', 'window' : '7d'})
def windowed_aggregation(ticker_df_for_rolling_close_bid_computation: AggregationArguments) -> pd.DataFrame:
return ... # the apply part of the groupby
def all_windowed_aggregations(windowed_aggregation: Collect[pd.DataFrame]) -> pd.DataFrame:
return concat(...) # the concat you had above
Elijah Ben Izzy
11/19/2023, 8:59 PMmiek
11/19/2023, 8:59 PMElijah Ben Izzy
11/19/2023, 9:00 PMsubdag
in the middle if you want to reuse nodes if you want to express it using Hamilton.Elijah Ben Izzy
11/19/2023, 9:00 PMmiek
11/19/2023, 9:00 PMmiek
11/19/2023, 9:01 PMmiek
11/19/2023, 9:02 PMElijah Ben Izzy
11/19/2023, 9:04 PMdef grouped_by_ticker(data: pd.DataFrame) -> List[Tuple[int, pd.DataFrame]]:
return list(data.groupby('ticker'))
def ticker_df_for_rolling_close_bid_computation(grouped_by_ticker: List[Tuple[int, pd.DataFrame]]) -> Parallelizable[AggregationArguments]:
for ticker, df in grouped_by_ticker:
yield ...
Elijah Ben Izzy
11/19/2023, 9:05 PMmiek
11/19/2023, 9:06 PMdef grouped_by_ticker(data: pd.DataFrame) -> List[Tuple[int, pd.DataFrame]]:
return list(data.groupby('ticker'))
it would only be computed once in memory?miek
11/19/2023, 9:07 PMElijah Ben Izzy
11/19/2023, 9:09 PMParallelizable
function)Elijah Ben Izzy
11/19/2023, 9:10 PMmiek
11/19/2023, 9:10 PMmiek
11/19/2023, 9:11 PMElijah Ben Izzy
11/19/2023, 9:14 PMparameterize
call with those — each one outputs a specific name
3. Write a function/subdag/something that processes it
4. A collect
that mirrors (2)
Then its two steps:
1. Implement the aggregation subdag
2. Add your aggregation name + fixed parameters to the global list. Dynamic parameters can also be declared, although you’ll have to be clever (this is where you can actually pass in all params/config and filter, or just filter the ones you want inside the parameterization function — lots of ways to do this.
And, what’s cool, is that the user can request just the ones they need.Elijah Ben Izzy
11/19/2023, 9:15 PMmiek
11/19/2023, 9:15 PMmiek
11/19/2023, 9:15 PMElijah Ben Izzy
11/19/2023, 9:16 PMmiek
11/19/2023, 9:16 PMmiek
11/20/2023, 3:08 AM@parameterize(
data_agg1=dict(input_values=value(['url10_a', 'url10_b'])),
data_agg2=dict(input_values=value(['url20_a', 'url20_b'])),
)
def data_node(input_values: List) -> Parallelizable[str]:
for url_ in input_values:
yield url_
def agg_1(data_agg1: str) -> int:
print('**********\nagg_1')
print(type(data_agg1)) # comes in as generator?
print(list(data_agg1))
return len(data_agg1.split("_")) # type(data_agg1) == generator -> throws
[...]
@parameterize(
output_agg1=dict(upstream_source=source('agg_1')),
output_agg2=dict(upstream_source=source('agg_2')),
)
def output_node(upstream_source: Collect[int]) -> int:
return upstream_source # for testing, should be concat() ...
When asking for output_agg1
, my agg_1
node doesn't receive strings but the whole generator from my data_agg1
node.
Without @parameterize
everythign works but when I add it (per our discussion above) things seem to behave differently and I'm not receiving the actual string items from the Parallizable data_agg1
node
Any ideas what I'm missing here?miek
11/20/2023, 3:12 AMElijah Ben Izzy
11/20/2023, 3:19 AMmiek
11/20/2023, 3:21 AMfor collector in collectors:
expander, nodes_in_block = self.nodes_after_last_expand_block(collector)
# TODO -- add error message for conflicting groups...
# if expander in visited:
# raise ValueError(f"Multiple collect nodes cannot trace "
# f"back to the same expander. ")
expander_name = f"expand-{expander.name}"
where expander
becomes None once you add the @parameterize
but frankly this low-level is a bit beyond my comfort zone, i.e. I don't have enough context what's happening under the hood.miek
11/20/2023, 3:24 AM@parameterize
+ Parallelizable
are somehow stepping on each otherElijah Ben Izzy
11/20/2023, 3:26 AMElijah Ben Izzy
11/20/2023, 3:33 AMmiek
11/20/2023, 3:33 AMElijah Ben Izzy
11/20/2023, 3:34 AMmiek
11/20/2023, 3:39 AMmiek
11/20/2023, 3:39 AMElijah Ben Izzy
11/20/2023, 3:40 AMElijah Ben Izzy
11/20/2023, 3:41 AMmiek
11/20/2023, 3:42 AMElijah Ben Izzy
11/20/2023, 3:44 AMmiek
11/20/2023, 3:44 AMElijah Ben Izzy
11/20/2023, 3:45 AMmiek
11/20/2023, 3:45 AMmiek
11/20/2023, 3:47 AMElijah Ben Izzy
11/20/2023, 3:48 AMElijah Ben Izzy
11/20/2023, 3:48 AMElijah Ben Izzy
11/20/2023, 3:48 AMElijah Ben Izzy
11/20/2023, 3:50 AMmiek
11/20/2023, 3:52 AMElijah Ben Izzy
11/20/2023, 3:53 AMmiek
11/20/2023, 3:53 AMmiek
11/20/2023, 3:54 AMElijah Ben Izzy
11/20/2023, 3:56 AMmiek
11/20/2023, 4:12 AM